PIT - Práctica 1: Detección de Actividad de Voz (I)

Alicia Lozano Díez

22 de abril de 2021

Objetivo

El objetivo de esta práctica es proporcionar una introducción al procesamiento de señales temporales como es el caso de la señal de voz, y desarrollar de un detector de actividad de voz basado en redes neuronales recurrentes.

Materiales

CUIDADO: * Los datos proporcionados son de uso exclusivo para esta práctica. No tiene permiso para copiar, distribuir o utilizar el corpus para ningún otro propósito.

1. Introducción al procesamiento de señales temporales

1.1. Descarga de ficheros de ejemplo

Primero vamos a descargar el audio de ejemplo de Moodle (audio_sample.wav) y ejecutar las siguientes líneas de código, que nos permitirán subir el archivo a Google Colab desde el disco local:

In [ ]:
from google.colab import files
uploaded = files.upload()
Upload widget is only available when the cell has been executed in the current browser session. Please rerun this cell to enable.
Saving audio_sample.wav to audio_sample.wav

Una vez cargado el fichero de audio, podemos escucharlo de la siguiente manera:

In [ ]:
import IPython

wav_file_name = "audio_sample.wav"
print(wav_file_name)
IPython.display.Audio(wav_file_name)
audio_sample.wav
Out[ ]:

1.2. Lectura y representación de audio en Python

A continuación vamos a definir ciertas funciones para poder hacer manejo de ficheros de audio en Python.

Comenzamos definiendo una función read_recording que leerá un fichero de audio WAV, normalizará la amplitud y devolverá el vector de muestras signal y su frecuencia de muestreo fs.

In [ ]:
import scipy.io.wavfile

def read_recording(wav_file_name): 
  fs, signal = scipy.io.wavfile.read(wav_file_name)
  signal = signal/max(abs(signal)) # normalizes amplitude
  
  return fs, signal

Si ejecutamos la función anterior para el fichero de ejemplo, podemos ver la forma en la que se carga dicho fichero de audio en Python. Así, podemos obtener la frecuencia de muestreo y la longitud del fichero en número de muestras:

In [ ]:
fs, signal = read_recording(wav_file_name) 
print("Signal variable shape: " + str(signal.shape))
print("Sample rate: " + str(fs))
print("File length: " + str(len(signal)) + " samples")
Signal variable shape: (67072,)
Sample rate: 16000
File length: 67072 samples

PREGUNTAS:

  • ¿Como obtendría la duración de la señal en segundos?

También podemos representar la señal y ver su forma de onda. Para ello, definimos la función plot_signal como sigue:

In [ ]:
import matplotlib.pyplot as plt
import numpy as np

def plot_signal(signal, fs, ylabel="", title=""):
  dur = len(signal)/fs
  step = 1./fs
  t_axis = np.arange(0., dur, step)

  plt.plot(t_axis, signal)
  plt.xlim([0, dur])
  plt.ylabel(ylabel)
  plt.xlabel('Time (seconds)')
  plt.title(title)
  plt.grid(True)

Y utilizando la función anterior, obtenemos su representación (amplitud frente al tiempo):

In [ ]:
plot_signal(signal, fs, "Amplitude", wav_file_name)
plt.show()

PREGUNTAS:

  • Incluya en el informe la representación obtenida para uno de los audios.

1.3. Representación de etiquetas de actividad de voz

En esta práctica, vamos a desarrollar un detector de actividad de voz, que determinará qué segmentos de la señal de voz son realmente voz y cuáles silencio.

Por ello, vamos a ver dos ejemplos de etiquetas ground truth, que corresponden al fichero de audio de ejemplo.

Primero, descargamos de Moodle las etiquetas de voz/silencio que están en los ficheros audio_sample_labels_1.voz y audio_sample_labels_2.voz y las cargamos en Google Colab como en el caso anterior.

In [ ]:
from google.colab import files
uploaded = files.upload()
Upload widget is only available when the cell has been executed in the current browser session. Please rerun this cell to enable.
Saving audio_sample_labels_2.voz to audio_sample_labels_2.voz
Saving audio_sample_labels_1.voz to audio_sample_labels_1.voz

Estas etiquetas están guardadas en ficheros de texto y podemos cargarlas en Python de la siguiente manera:

In [ ]:
labels_file_name = 'audio_sample_labels_1.voz'
voice_labels = np.loadtxt(labels_file_name)

Con el siguiente código, podemos representar la señal de voz así como sus etiquetas en la misma figura:

In [ ]:
plot_signal(signal, fs)
plot_signal(voice_labels*2-1, fs, "Amplitude", wav_file_name)
plt.show()

Las etiquetas de voz/silencio provienen de distintos detectores de actividad de voz.

In [ ]:
labels_file_name = 'audio_sample_labels_2.voz'
voice_labels_2 = np.loadtxt(labels_file_name)
In [ ]:
plot_signal(signal, fs)
plot_signal(voice_labels_2*2-1, fs, "Amplitude", wav_file_name)
plt.show()
In [ ]:
print(f"% de voz en el primer etiquetado: {100*np.mean(voice_labels):.2f}%")
print(f"% de voz en el segundo etiquetado: {100*np.mean(voice_labels_2):.2f}%")
% de voz en el primer etiquetado: 76.26%
% de voz en el segundo etiquetado: 84.79%

PREGUNTAS:

  • ¿Qué valores tienen las etiquetas? ¿Qué significan dichos valores?
  • ¿Por qué se representa _voicelabels*2-1?
  • Represente la señal de voz junto con las etiquetas para ambos audios e incluya las figuras en el informe de la práctica. ¿Qué diferencias observas? ¿A qué se puede deber?
  • ¿Qué cantidad de voz/silencio hay en cada etiquetado?

1.4. Extracción de características

En la mayoría de sistemas de reconocimiento de patrones, un primer paso es la extracción de características. Esto consiste, a grandes rasgos, en obtener una representación de los datos de entrada, que serán utilizados para un posterior modelado.

En nuestro caso, vamos pasar de la señal en crudo "raw" dada por las muestras (signal), a una secuencia de vectores de características que extraigan información a corto plazo de la misma y la representen. Esta sería la entrada a nuestro sistema de detección de voz basado en redes neuronales.

Para ver algunos ejemplos, vamos a utilizar la librería librosa (https://librosa.org/doc/latest/index.html).

Dentro de esta librería, tenemos funciones para extraer distintos tipos de características de la señal de voz, como por ejemplo el espectrograma en escala Mel (melspectrogram).

Estas características a corto plazo, se extraen en ventanas de unos pocos milisegundos con o sin solapamiento.

Un ejemplo sería el siguiente:

In [ ]:
import librosa

mel_spec = librosa.feature.melspectrogram(
    signal, fs, n_mels=23, win_length=320, hop_length=160)

print(mel_spec.shape)
print(signal.shape)
(23, 420)
(67072,)
In [ ]:
from librosa.display import specshow

S_DB = librosa.power_to_db(mel_spec, ref=np.max)
specshow(S_DB, sr=fs, hop_length=160, x_axis='time', y_axis='mel');
plt.colorbar(format='%+2.0f dB');

PREGUNTAS:

  • ¿Qué se obtiene de la función anterior?
  • ¿Qué significan los valores de los parámetros _winlength y _hoplength?
  • ¿Qué dimensiones de _melspec obtienes? ¿Qué significan?

De esta manera, podríamos obtener una parametrización de las señales para ser utilizadas como entrada a nuestra red neuronal.

Para los siguientes apartados, se proporcionan los vectores de características MFCC para una serie de audios que se utilizarán como conjunto de entrenamiento del modelo de VAD.

2. Detector de actividad de voz (Voice Activity Detector, VAD)

2.1. Descarga de los datos de entrenamiento

Primero vamos a descargar la lista de identificadores de los datos de entrenamiento de la práctica.

Para ello, necesitaremos descargar de Moodle el fichero training_VAD.lst, y ejecutar las siguientes líneas de código, que nos permitirán cargar el archivo a Google Colab desde el disco local:

In [ ]:
from google.colab import drive
drive.mount("/content/drive", force_remount = True)
DIR = "/content/drive/My Drive/pit/"
%cd "$DIR"
Mounted at /content/drive
/content/drive/My Drive/pit

A continuación cargamos los identificadores contenidos en el fichero en una lista en Python:

In [ ]:
file_train_list = 'training_VAD.lst' # mat files containing data + labels
f = open(file_train_list, 'r')
train_list = f.read().splitlines()
f.close()

Podemos ver algunos de ellos (los primeros 10 identificatores) de la siguiente forma:

In [ ]:
print(train_list[:10])
['features_labs_1.mat', 'features_labs_10.mat', 'features_labs_100.mat', 'features_labs_101.mat', 'features_labs_102.mat', 'features_labs_103.mat', 'features_labs_104.mat', 'features_labs_105.mat', 'features_labs_106.mat', 'features_labs_107.mat']

Ahora, descargaremos de Moodle el fichero data_download_onedrive_training_VAD.sh, y ejecutaremos las siguientes líneas de código, que nos permitirán cargar el archivo a Google Colab desde el disco local:

Para descargar el conjunto de datos desde Google drive, ejecutamos el script cargado anteriormente de la siguiente manera:

In [ ]:
!chmod 755 data_download_onedrive_training_VAD.sh
!./data_download_onedrive_training_VAD.sh
--2021-04-24 11:38:25--  https://dauam-my.sharepoint.com/:u:/g/personal/alicia_lozano_uam_es/EdCueYU7BpNAuo6BawH8hJAB5rclap745BmsPzXgSPhsgw?download=1
Resolving dauam-my.sharepoint.com (dauam-my.sharepoint.com)... 13.107.136.9
Connecting to dauam-my.sharepoint.com (dauam-my.sharepoint.com)|13.107.136.9|:443... connected.
HTTP request sent, awaiting response... 302 Found
Location: /personal/alicia_lozano_uam_es/Documents/PIT/training_VAD.zip?originalPath=aHR0cHM6Ly9kYXVhbS1teS5zaGFyZXBvaW50LmNvbS86dTovZy9wZXJzb25hbC9hbGljaWFfbG96YW5vX3VhbV9lcy9FZEN1ZVlVN0JwTkF1bzZCYXdIOGhKQUI1cmNsYXA3NDVCbXNQelhnU1Boc2d3P3J0aW1lPUhvcGplUlVIMlVn [following]
--2021-04-24 11:38:26--  https://dauam-my.sharepoint.com/personal/alicia_lozano_uam_es/Documents/PIT/training_VAD.zip?originalPath=aHR0cHM6Ly9kYXVhbS1teS5zaGFyZXBvaW50LmNvbS86dTovZy9wZXJzb25hbC9hbGljaWFfbG96YW5vX3VhbV9lcy9FZEN1ZVlVN0JwTkF1bzZCYXdIOGhKQUI1cmNsYXA3NDVCbXNQelhnU1Boc2d3P3J0aW1lPUhvcGplUlVIMlVn
Reusing existing connection to dauam-my.sharepoint.com:443.
HTTP request sent, awaiting response... 200 OK
Length: 3638232935 (3.4G) [application/x-zip-compressed]
Saving to: ‘EdCueYU7BpNAuo6BawH8hJAB5rclap745BmsPzXgSPhsgw?download=1’

EdCueYU7BpNAuo6BawH 100%[===================>]   3.39G  37.4MB/s    in 76s     

2021-04-24 11:39:42 (45.8 MB/s) - ‘EdCueYU7BpNAuo6BawH8hJAB5rclap745BmsPzXgSPhsgw?download=1’ saved [3638232935/3638232935]

Este script descargará los datos de Google Drive y los cargará en Google Colab, descomprimiéndolos en la carpeta data/training_VAD.

Podemos comprobar que los ficheros .mat se encuentran en el directorio esperado:

In [ ]:
!ls data/training_VAD/ | head
features_labs_100.mat
features_labs_101.mat
features_labs_102.mat
features_labs_103.mat
features_labs_104.mat
features_labs_105.mat
features_labs_106.mat
features_labs_107.mat
features_labs_108.mat
features_labs_109.mat

2.2. Definición del modelo

Utilizando la librería Pytorch (https://pytorch.org/docs/stable/index.html), vamos a definir un modelo de ejemplo con una capa LSTM y una capa de salida. La capa de salida estará formada por una única neurona. La salida indicará la probabilidad de voz/silencio utilizando una función sigmoide.

In [ ]:
import torch
import torch.nn as nn
import torch.nn.functional as F

class Model_1(nn.Module):
    def __init__(self, feat_dim=20):
        super(Model_1, self).__init__()

        self.lstm = nn.LSTM(feat_dim,256,batch_first=True,bidirectional=False)
        self.output = nn.Linear(256,1)

    def forward(self, x):

        out = self.lstm(x)[0]
        out = self.output(out)
        out = torch.sigmoid(out)

        return out.squeeze(-1)

PREGUNTAS:

  • ¿Qué tamaño tiene la entrada a la capa LSTM?
  • ¿Cuántas unidades (celdas) tiene dicha capa LSTM?
  • ¿Qué tipo de matriz espera la LSTM? Mirar la documentación y describir brevemente.
  • Revisar la documentación de torch.nn.LSTM y describir brevemente los argumentos _batchfirst, bidirectional y dropout.
  • En este modelo, estamos utilizando una única neurona a la salida. ¿Hay alguna otra alternativa? ¿Se seguiría utilizando una función sigmoid?
  • ¿Para qué sirve la función forward definida en la clase _Model1?

Una vez definida la clase, podemos crear nuestra instancia del modelo y cargarlo en la GPU con el siguiente código:

In [ ]:
model = Model_1(feat_dim=20)
model = model.to(torch.device("cuda"))
print(model)
Model_1(
  (lstm): LSTM(20, 256, batch_first=True)
  (output): Linear(in_features=256, out_features=1, bias=True)
)

Nuestra variable model contiene el modelo, y ya estamos listos para entrenarlo y evaluarlo.

2.3. Lectura y preparación de los datos para el entrenamiento

Como hemos visto anteriormente, nuestros datos están guardados en ficheros de Matlab (.mat). Cada uno de estos ficheros contiene una matriz X correspondiente a las secuencias de características MFCC (con sus derivadas de primer y segundo orden), y un vector Y con las etiquetas de voz/silencio correspondientes.

Veamos un ejemplo:

In [ ]:
features_file = 'data/training_VAD/features_labs_1.mat'

import scipy.io
features = scipy.io.loadmat(features_file)['X']
labels = scipy.io.loadmat(features_file)['Y']

print(features.shape)
print(labels.shape)
(46654, 60)
(46654, 1)

PREGUNTAS: Elegir un fichero de entrenamiento y responder a las siguientes preguntas:

  • ¿Qué tamaño tiene features? ¿Y labels?
  • Una de las dimensiones de la features es 60, correspondiente a los 20 coeficientes MFCC concatenados con las derivadas de primer y segundo orden. ¿Con qué se corresponde la otra dimensión?

El entrenamiento del modelo se va a realizar mediante descenso por gradiente (o alguna de sus variantes) basado en batches.

Para preparar cada uno de estos batches que servirán de entrada a nuestro modelo LSTM, debemos almacenar las características en secuencias de la misma longitud. El siguiente código lee las características (get_fea) y sus correspondientes etiquetas (get_lab) de un fragmento aleatorio del fichero de entrada.

In [ ]:
import scipy.io
import numpy as np

def get_fea(segment, rand_idx):
    data = scipy.io.loadmat(segment)['X']
    if data.shape[0] <= length_segments:
        start_frame = 0
    else:
        start_frame = np.random.permutation(data.shape[0]-length_segments)[0]

    end_frame = np.min((start_frame + length_segments,data.shape[0]))
    rand_idx[segment] = start_frame
    feat = data[start_frame:end_frame,:20] # discard D and DD, just 20 MFCCs
    return feat[np.newaxis, :, :]   


def get_lab(segment, rand_idx):
    data = scipy.io.loadmat(segment)['Y']
    start_frame = rand_idx[segment]
    end_frame = np.min((start_frame + length_segments, data.shape[0]))
    labs = data[start_frame:end_frame].flatten()
    return labs[np.newaxis,:]

PREGUNTAS: Analizar las funciones anteriores detenidamente y responder a las siguientes cuestiones:

  • ¿De qué tamaño son los fragmentos que se están leyendo?
  • ¿Para qué sirve _randidx?

2.4. Entrenamiento del modelo

Una vez definidas las funciones de lectura de datos y preparación del formato que necesitamos para la entrada a la red LSTM, podemos utilizar el siguiente código para entrenarlo.

In [ ]:
length_segments = 300 
path_in_feat = 'data/training_VAD/'

from torch import optim

criterion = nn.BCELoss()
optimizer = optim.Adam(model.parameters(), lr=0.001)

batch_size = 51
segment_sets = np.array_split(train_list, len(train_list)/batch_size)

accs, losses = [], []

max_iters = 5
for epoch in range(1, max_iters):
  print('Epoch:', epoch)
  
  model.train()
  cache_loss = 0
  y_pred, y_true = [], []

  for ii, segment_set in enumerate(segment_sets):
    rand_idx = {}
    optimizer.zero_grad()

    # Create training batches
    train_batch = np.vstack([get_fea(path_in_feat + segment, rand_idx) for segment in segment_set])
    labs_batch = np.vstack([get_lab(path_in_feat + segment, rand_idx).astype(np.int16) for segment in segment_set])
    assert len(labs_batch) == len(train_batch) # make sure that all frames have defined label

    # Shuffle the data and place them into Pytorch tensors
    shuffle = np.random.permutation(len(labs_batch))
    labs_batch = torch.tensor(labs_batch.take(shuffle, axis=0).astype("float32")).to(torch.device("cuda"))
    train_batch = torch.tensor(train_batch.take(shuffle, axis=0).astype("float32")).to(torch.device("cuda"))

    #print(f"  Mini-batch {ii+1}")

    # Forward the data through the network
    outputs = model(train_batch)

    # Compute cost
    loss = criterion(outputs, labs_batch)

    # Backward step
    loss.backward()
    optimizer.step()
    cache_loss += loss.item()

    # Save
    preds = outputs.cpu().detach().numpy().flatten()
    y_pred.extend([1 if p >= 0.5 else 0 for p in preds])
    y_true.extend(labs_batch.cpu().numpy().flatten())

  y_pred = np.array(y_pred, dtype=np.float32)
  y_true = np.array(y_true, dtype=np.float32)
  acc = np.mean(y_true == y_pred)
  loss = cache_loss/(ii+1)

  print(f"  Loss: {loss:.5f}")
  print(f"  Accuracy: {acc:.3f}")

  accs.append(acc)
  losses.append(loss)
Epoch: 1
  Loss: 0.61078
  Accuracy: 0.710
Epoch: 2
  Loss: 0.44381
  Accuracy: 0.811
Epoch: 3
  Loss: 0.32235
  Accuracy: 0.870
Epoch: 4
  Loss: 0.28559
  Accuracy: 0.888
In [ ]:
import matplotlib.pyplot as plt
epochs = np.arange(max_iters - 1) + 1
plt.plot(epochs, losses, "-o", label="Loss")
plt.plot(epochs, accs, "-o", label ="Accuracy")
plt.xticks(epochs)
plt.xlabel("Epochs")
plt.legend()
plt.show()

PREGUNTAS: Analizar el código anterior cuidadosamente y ejecutarlo. A continuación, responder a las siguientes cuestiones:

  • ¿Qué función de coste se está optimizando? Describir brevemente con ayuda de la documentación.
  • ¿Qué optimizador se ha definido?
  • ¿Para qué se utiliza _batchsize?
  • Describir brevemente la creación de los batches.
  • ¿Qué línea de código realiza el forward pass?
  • ¿Qué línea de código realiza el backward pass?
  • ¿Cuántas iteraciones del algoritmo ha realizado? ¿Qué observa en la evolución de la función de coste?
  • Añada al código el cálculo de la precisión o accuracy, de tal manera que se muestre por pantalla dicho valor en cada iteración (similar a lo que ocurre con el valor del coste loss). Copiar el código en el informe y describir brevemente.
  • ¿Qué valor de coste y accuracy obtiene? ¿Cómo se puede mejorar?

2.5. Evaluación del modelo

Una vez entrenado el modelo, vamos a evaluarlo en un ejemplo en concreto.

Descargue de Moodle el fichero audio_sample_test.wav, con sus correspondientes características y etiquetas audio_sample_test.mat y evalúe el rendimiento en el mismo.

In [ ]:
from google.colab import files
uploaded = files.upload()
Upload widget is only available when the cell has been executed in the current browser session. Please rerun this cell to enable.
Saving audio_sample_test.wav to audio_sample_test.wav
Saving audio_sample_test.mat to audio_sample_test.mat
In [ ]:
features_file = 'audio_sample_test.mat'

import scipy.io
features = scipy.io.loadmat(features_file)['X'][:, :20]
labels = scipy.io.loadmat(features_file)['Y'].reshape(-1).astype("float32")

print(features.shape)
print(labels.shape)
(57777, 20)
(57777,)
In [ ]:
def get_predictions(features):
    model.eval()
    with torch.no_grad():
        batch = features[np.newaxis, :, :]
        batch = torch.tensor(batch.astype("float32")).to(torch.device("cuda"))

        # predict probabilities of voice for each frame
        # shape of input is (1, 57777, 20)
        preds = model(batch)

    return preds.cpu().numpy()[0]
In [ ]:
# get predictions
probs = get_predictions(features)

# get the predicted labels from the model's predictions
pred_labels = np.array([1 if p >= 0.5 else 0 for p in probs])

print(pred_labels.shape)
print(labels.shape)
(57777,)
(57777,)
In [ ]:
# Get accuracy
acc = np.mean(labels == pred_labels)

print(f'Accuracy: {acc:.3f}')
Accuracy: 0.909
In [ ]:
# save predictions
np.savez("preds_P1.npz", preds=pred_labels)
In [ ]:
# load predictions
#with np.load('preds_P1.npz') as data:
#    pred_labels = data['preds']
In [ ]:
wav_file_name = "audio_sample_test.wav"
fs, signal = read_recording(wav_file_name)
print("Signal shape: " + str(signal.shape))
print("Sample rate: " + str(fs))
Signal shape: (4800160,)
Sample rate: 8000
In [ ]:
segment = 1
n_seconds = 10  # only 10 seconds
n_labels_segment = n_seconds*100  # one label every 10 ms

signal_segment = signal[segment*n_seconds*fs:(segment + 1)*n_seconds*fs]
labels_segment = labels[segment*n_labels_segment:(segment + 1)*n_labels_segment]
pred_labels_segment = pred_labels[segment*n_labels_segment:(segment + 1)*n_labels_segment]

print(f"Número de muestras de los {n_seconds} segundos de la señal:", len(signal_segment))
Número de muestras de los 10 segundos de la señal: 80000
In [ ]:
def plot_labels(labels, color='tab:orange'):
    t = np.arange(n_seconds*segment, (segment + 1)*n_seconds, 1./fs)
    labels_range = np.linspace(segment*n_seconds, (segment + 1)*n_seconds, num=n_labels_segment)

    plt.plot(t, signal_segment)
    plt.plot(labels_range, 2*labels - 1, color=color)

    plt.title(wav_file_name)
    plt.xlabel("Time (seconds)")
    plt.ylabel("Amplitude")
    plt.grid(True)
    plt.xticks(np.arange(n_seconds*segment, (segment + 1)*n_seconds + 2, 2))
    plt.xlim([t[0]-1e-3, t[-1]+1e-3])
    plt.show()
In [ ]:
plot_labels(labels_segment)
In [ ]:
plot_labels(pred_labels_segment, color="green")
In [ ]:
import IPython

print(wav_file_name)
IPython.display.Audio(wav_file_name)
audio_sample_test.wav
Out[ ]:

PREGUNTAS:

  • Incluya en el informe de la práctica el código que ha utilizado para evaluar dicho fichero.
  • ¿Cuál es el accuracy obtenido para el fichero audio_sample_test?
  • Represente 10 segundos de dicho audio, así como sus etiquetas de _groundtruth y las obtenidas con su modelo. Incluya dicha gráfica en el informe y comente brevemente el resultado. Visualmente, ¿es bueno el modelo?
  • Escuche el audio y comente cualitativamente cómo es de bueno o malo el modelo.